[AWS IoT Core] 1秒間に17,000件のMQTTメッセージをpublishするテスト用のクライアントを作成してみました
1 はじめに
CX事業本部の平内(SIN)です。
AWS IoT CoreでMQTTのメッセージブローカーを利用したシステムを構築する際に、大量のメッセージに対応できているかどうかの確認が必要になる場合があると思います。
今回は、1秒間に17,000件のメッセージをPublishする、テスト用のクライアントを作成してみました。
ちなみに、17,000という数字は、使用したインスタンスで、1秒間に処理できる最大量で決まりました。もっと強いインスタンスを使用すれば、もしかすると、これ以上の数字も出せるのかも知れませんが、すいません、試していません。
2 構成
テスト用のクライアントは、Pythonで作成されており、EC2上で動作させています。
沢山のメッセージを送信をするために、一定のCPUやネットワークが必要なため、EC2のインスタンスは、m5.24xlargeを使用しています。
3 コード
テスト用のクライアントは、以下の通りです。
1秒ごとに1つのスレッドを起動し、スレッド中で、17,000件のpublishを行っています。
MQTTクライアントは、4,000個作成して接続し、順番に利用しています。後で紹介しますが、沢山のメッセージを送信するためには、多くの接続が必要です。
import json import time import math import datetime import threading from awscrt import io, mqtt from awsiot import mqtt_connection_builder # MQTTオブジェクト class Mqtt(): def __init__(self, endpoint, client_id, root_ca, cert, key): event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) self.__mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint = endpoint, cert_filepath = cert, pri_key_filepath = key, client_bootstrap = client_bootstrap, ca_filepath = root_ca, client_id = client_id, clean_session = False, keep_alive_secs = 6) connect_future = self.__mqtt_connection.connect() connect_future.result() print("Connected! {}".format(client_id)) def publish(self, topic, payload): self.__mqtt_connection.publish( topic = topic, payload = json.dumps(payload), qos = mqtt.QoS.AT_MOST_ONCE) # mqtt.QoS.AT_LEAST_ONCE def __del__(self): disconnect_future = self.__mqtt_connection.disconnect() disconnect_future.result() # MQTTクライアントの生成 def create_clients(client_max, client_id_prefix, endpoint, root_ca, cert, key): print("mqtt clinet_max:{} client_id_prefix:{}".format(client_max, client_id_prefix)) clients = [] for i in range(client_max): clients.append(Mqtt(endpoint,"{}_{:04d}".format(client_id_prefix, i), root_ca, cert, key)) return clients # payloadに乗せるデータの生成 def create_data(data_size): print("data_size:{}".format(data_size)) data = '' for i in range(data_size): data += 'X' return data RPS = 17000 PERIOD = 10 # 送信時間[sec] DATA_SIZE = 128 # byte CLIENT_ID_PREFIX = "XF" CLIENT_MAX = 4000 # 同一Clientの秒間制限があるため endpoint = "xxxxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" cert = "./certs/xxxxxxxx-certificate.pem.crt" key = "./certs/xxxxxxxx-private.pem.key" root_ca = "./certs/root-CA.crt" #topic = "load_test" # message broker topic = '$aws/rules/load_test/topic/load_test' # basic ingest # MATTクライアント生成 clients = create_clients(CLIENT_MAX, CLIENT_ID_PREFIX, endpoint, root_ca, cert, key) # payloadに乗せるデータの生成 data = create_data(DATA_SIZE) print("CLIENT_ID_PREFIX:{} CLIENT_MAX:{}".format(CLIENT_ID_PREFIX, CLIENT_MAX)) print("RPS:{} PERIOD:{} [sec]".format(RPS, PERIOD)) def worker(rps): counter = 0 start = time.time() for i in range(rps): payload = { "producer_timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'), "counter": counter, "data": data } index = i % CLIENT_MAX clients[index].publish(topic, payload) counter += 1 elapsed_time = time.time() - start print ("counter: {} elapsed_time: {} sec".format(counter, elapsed_time)) time_interval = 1 for i in range(PERIOD): print("{} {}".format(i, datetime.datetime.now())) now = time.time() t = threading.Thread(target = worker(RPS)) t.setDaemon(True) t.start() t.join() wait_time = time_interval - ( (time.time() - now) % time_interval ) time.sleep(wait_time) print("wait...") time.sleep(1) # 送りきらないうちに終わってしまうのを防止する # MATTクライアント破棄 for i in range(CLIENT_MAX): del clients[0] print("finish!")
4 動作確認
上記のコードを実行すると、以下のような出力となります。各スレッドが、正確に1秒単位で実行され、各スレッドは、1秒以内に17,000件の送信を完了していることが分かります。
data_size:128 CLIENT_ID_PREFIX:XF CLIENT_MAX:4000 RPS:17000 PERIOD:10 [sec] 0 2021-06-05 00:54:00.875850 counter: 17000 elapsed_time: 0.901975154876709 sec 1 2021-06-05 00:54:01.876103 counter: 17000 elapsed_time: 0.8145761489868164 sec 2 2021-06-05 00:54:02.876444 counter: 17000 elapsed_time: 0.7705812454223633 sec 3 2021-06-05 00:54:03.876837 counter: 17000 elapsed_time: 0.7373974323272705 sec 4 2021-06-05 00:54:04.877293 counter: 17000 elapsed_time: 0.7749261856079102 sec 5 2021-06-05 00:54:05.877650 counter: 17000 elapsed_time: 0.7767212390899658 sec 6 2021-06-05 00:54:06.878021 counter: 17000 elapsed_time: 0.7741456031799316 sec 7 2021-06-05 00:54:07.878389 counter: 17000 elapsed_time: 0.76737380027771 sec 8 2021-06-05 00:54:08.878803 counter: 17000 elapsed_time: 0.7726783752441406 sec 9 2021-06-05 00:54:09.879194 counter: 17000 elapsed_time: 0.7592222690582275 sec wait... finish!
AWS IoT Coreを詳細ログを出力し、確認している様子です。
1秒ごとのPublish-In(status=Success)の件数を確認すると、概ね17,000/secのメッセージが到着していることを確認できます。
fields @timestamp, @message | sort @timestamp desc | filter clientId like /XF_.*/ | filter eventType = "Publish-In" and status = "Success" | stats count(eventType) by bin(1s)
また、失敗のログ「Publish-In(status=Failure)」は記録されていません。
fields @timestamp, @message | sort @timestamp desc | filter clientId like /XF_.*/ | filter eventType = "Publish-In" and status != "Success"
5 クライアント数
ここまで、対応可能なクライアント数で送信していたため、エラーは発生していませんが、クライアント数が不足した場合は、以下の様な状況となります。
例として、クライアント数を1とし、毎秒300件を5秒間だけ送信してみています。
RPS = 300 PERIOD = 5 # 送信時間[sec] DATA_SIZE = 128 # byte CLIENT_ID_PREFIX = "XJ" CLIENT_MAX = 1 # 同一 Clientでの秒間制限がある
Connected! XJ_0000 data_size:128 CLIENT_ID_PREFIX:XJ CLIENT_MAX:1 RPS:300 PERIOD:5 [sec] 0 2021-06-05 02:07:44.081579 counter: 300 elapsed_time: 0.9059906005859375 sec 1 2021-06-05 02:07:45.081736 counter: 300 elapsed_time: 0.9057602882385254 sec 2 2021-06-05 02:07:46.081907 counter: 300 elapsed_time: 0.9057824611663818 sec 3 2021-06-05 02:07:47.082070 counter: 300 elapsed_time: 0.9058058261871338 sec 4 2021-06-05 02:07:48.082242 counter: 300 elapsed_time: 0.9058108329772949 sec wait... finish!
到着数は、以下です。毎秒300件のはずが、全然足りていないことが分かります。
また、ERRORログも記録されており、reasonは、throttledとなっていました。
こちらは、AWS IoT Core メッセージブローカーの制限にある、「接続別の 1 秒あたりのパブリッシュリクエスト数」のようです。
https://docs.aws.amazon.com/ja_jp/general/latest/gr/iot-core.html
6 最後に
今回は、テスト用に使用するため、大量にpublishするクライアントを作成してみました。
ログに、Publish-InのERRORが出力された時、reasonが、throttledとなっていたので、少し戸惑ったのですが、今回試した程度の量であれば、接続数を増加させることで回避が可能のようでした。
扱いが簡単なようにと、1インスタンス、1アプリの形式で作業しましたが、これ以上となると、もしかすると、インスタンス数を増やす方向の方が簡単なのかも知れません。